fix: prevent CLI session message loss and Telegram policy bypass#1885
Conversation
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
✅ Action performedReview finished.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughTwo independent features are introduced: (1) Telegram bot security handler now conditions explicit user allowlisting on non-empty allowed_users config with comprehensive test updates; (2) UnifiedSessionStore adds concurrent-safe persistence via read-modify-write merging with cross-platform file locking and mtime tracking. ChangesTelegram Unknown User Policy
Session Store Concurrent-Safe Persistence
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/praisonai/praisonai/cli/session/unified.py`:
- Around line 302-303: The cache mtime update uses path.stat() which can fail if
the file is unlinked or replaced concurrently; in the methods load() and save()
(look for uses around session.session_id and self._cache_mtime), either read the
mtime from the already-open file descriptor using os.fstat(fd) (preferable when
you have the open file object) or wrap the path.stat() call in a try/except that
catches FileNotFoundError and skips/ignores updating self._cache_mtime for that
session; update both occurrences (the block setting
self._cache_mtime[session.session_id] near the shown lines and the similar code
at 330-332) so they use the same safe approach.
- Around line 213-220: The current merge uses max() semantics on cumulative
counters (merged vs incoming) which drops concurrent increments; instead compute
and apply deltas from each writer's loaded baseline: for each counter
(total_input_tokens, total_output_tokens, total_cost, request_count) replace the
max logic with merged.<counter> += max(0, incoming.<counter> -
incoming.<counter>_base) where incoming.<counter>_base is the value the writer
originally loaded (or, if your model persists per-write increments, use
incoming.<counter>_delta directly); update the merge site handling (variables
merged and incoming) to expect and use these base or delta fields so concurrent
increments are summed rather than overwritten.
- Around line 151-156: The current merge in unified.py (the loop using zip over
left/right and the subsequent reconstruction disk_session.messages +
incoming.messages[prefix:]) treats messages identical iff role/content match,
which loses deletions (shorter histories) and collapses same-text concurrent
turns; replace this with an explicit per-message identity/revision strategy: add
or use a stable message_id (or combine timestamp + origin) and a
revision/updated_at field on message objects, then update the merge logic in the
functions using the zip/prefix pattern (the loop around prefix and the block at
lines ~209-212) to compare message_id/revision/updated_at first, detect
deletions (when disk has messages not present in incoming) and updates (incoming
revision newer -> replace), and append only truly new messages; ensure same-text
different-turns are distinguished by message_id or timestamp so they are not
collapsed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 60030260-4d26-4d44-91fb-f6370363b9db
📒 Files selected for processing (4)
src/praisonai/praisonai/bots/telegram.pysrc/praisonai/praisonai/cli/session/unified.pysrc/praisonai/tests/unit/cli/test_unified_session.pysrc/praisonai/tests/unit/gateway/test_telegram_security_pipeline.py
| for left_msg, right_msg in zip(left, right): | ||
| if left_msg.get("role") != right_msg.get("role"): | ||
| break | ||
| if left_msg.get("content") != right_msg.get("content"): | ||
| break | ||
| prefix += 1 |
There was a problem hiding this comment.
The message merge is still lossy outside the simple append case.
This logic only treats role/content as message identity and always reconstructs disk_session.messages + incoming.messages[prefix:]. That leaves two deterministic data-loss paths: clear_messages() can never persist because shorter histories are ignored, and two concurrent same-text turns can collapse into one because their timestamps are not part of the prefix match. This needs an explicit message identity / revisioning strategy instead of append-only prefix matching.
Also applies to: 209-212
🧰 Tools
🪛 Ruff (0.15.15)
[warning] 151-151: zip() without an explicit strict= parameter
Add explicit value for parameter strict=
(B905)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/cli/session/unified.py` around lines 151 - 156, The
current merge in unified.py (the loop using zip over left/right and the
subsequent reconstruction disk_session.messages + incoming.messages[prefix:])
treats messages identical iff role/content match, which loses deletions (shorter
histories) and collapses same-text concurrent turns; replace this with an
explicit per-message identity/revision strategy: add or use a stable message_id
(or combine timestamp + origin) and a revision/updated_at field on message
objects, then update the merge logic in the functions using the zip/prefix
pattern (the loop around prefix and the block at lines ~209-212) to compare
message_id/revision/updated_at first, detect deletions (when disk has messages
not present in incoming) and updates (incoming revision newer -> replace), and
append only truly new messages; ensure same-text different-turns are
distinguished by message_id or timestamp so they are not collapsed.
| if incoming.total_input_tokens > merged.total_input_tokens: | ||
| merged.total_input_tokens = incoming.total_input_tokens | ||
| if incoming.total_output_tokens > merged.total_output_tokens: | ||
| merged.total_output_tokens = incoming.total_output_tokens | ||
| if incoming.total_cost > merged.total_cost: | ||
| merged.total_cost = incoming.total_cost | ||
| if incoming.request_count > merged.request_count: | ||
| merged.request_count = incoming.request_count |
There was a problem hiding this comment.
max() loses concurrent counter deltas.
These fields are cumulative totals, not last-writer-wins scalars. If two stores both load request_count == 5, each increment to 6, and save, this merge keeps 6 instead of 7; token and cost totals undercount the same way. To make the store truly concurrent-safe, merge the delta from each writer’s loaded baseline (or persist per-write increments separately) rather than taking the larger absolute value.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/cli/session/unified.py` around lines 213 - 220, The
current merge uses max() semantics on cumulative counters (merged vs incoming)
which drops concurrent increments; instead compute and apply deltas from each
writer's loaded baseline: for each counter (total_input_tokens,
total_output_tokens, total_cost, request_count) replace the max logic with
merged.<counter> += max(0, incoming.<counter> - incoming.<counter>_base) where
incoming.<counter>_base is the value the writer originally loaded (or, if your
model persists per-write increments, use incoming.<counter>_delta directly);
update the merge site handling (variables merged and incoming) to expect and use
these base or delta fields so concurrent increments are summed rather than
overwritten.
| if path.exists(): | ||
| self._cache_mtime[session.session_id] = path.stat().st_mtime |
There was a problem hiding this comment.
path.stat() needs to tolerate concurrent unlink/replace.
After the descriptor is closed, another process can delete or swap the session file before these calls run. In load(), that can turn a successful read into FileNotFoundError; in save(), it can make a completed write bubble up as a failure. Read the mtime from the open fd (os.fstat(...)) or guard these cache updates with FileNotFoundError.
Also applies to: 330-332
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/cli/session/unified.py` around lines 302 - 303, The
cache mtime update uses path.stat() which can fail if the file is unlinked or
replaced concurrently; in the methods load() and save() (look for uses around
session.session_id and self._cache_mtime), either read the mtime from the
already-open file descriptor using os.fstat(fd) (preferable when you have the
open file object) or wrap the path.stat() call in a try/except that catches
FileNotFoundError and skips/ignores updating self._cache_mtime for that session;
update both occurrences (the block setting self._cache_mtime[session.session_id]
near the shown lines and the similar code at 330-332) so they use the same safe
approach.
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job Final Architecture Review Complete ✅Phase 1: Architecture Review per AGENTS.md
Phase 2: Address Valid Issues from Previous Reviewers
Phase 3: Final Verdict
✅ APPROVED - Critical Issues FixedArchitecture Compliance✅ Protocol-driven: Changes correctly isolated to wrapper layer (praisonai package) Issues Addressed🔧 Fixed by Me:
📝 Reviewed & Validated: Tests Passing
Files ModifiedFixed:
Skipped: No changes needed to other files - original implementation was sound Final Verdict: APPROVEThis PR correctly fixes critical concurrency bugs while maintaining the sound architectural decisions of the original implementation. The Telegram security fix closes a real auth bypass. Changes are minimal, focused, and properly tested. |
|
Merge review — blocked
Fix list:
|
|
Claude finished @MervinPraison's task —— View job Merge Conflict Resolution ✅Tasks:
✅ Rebase Successfully CompletedResolved Conflicts:
Key Improvements Preserved:
Ready for Merge: All conflicts resolved, syntax validated, changes pushed to branch |
- UnifiedSessionStore: merge messages under file lock on save to avoid clobbering concurrent writes from TUI/interactive processes; always reload from disk on load - Telegram: align user allowlist check with Discord/Slack so empty allowed_users honours unknown_user_policy (default deny) - Add regression tests for both issues Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
- Fix counter delta loss: Use delta tracking instead of max() to preserve concurrent increments - Fix file stat race conditions: Handle FileNotFoundError when files are deleted between operations - Add baseline stat tracking for proper merge semantics - Maintain backward compatibility for existing sessions Addresses CodeRabbit feedback on concurrent session safety issues. Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
7979fcf to
94cfcf7
Compare
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
Greptile SummaryThis PR addresses two real defects: a Telegram
Confidence Score: 3/5The Telegram fix is safe to merge as-is; the session store refactor has a logic error in the stat-merge path that causes token/cost counters to be silently discarded on every save that hits an existing file. The delta-tracking scheme introduced for stats is broken because set_baseline_stats() is called on the incoming session immediately before _merge_sessions(), resetting the baseline to current values and making every computed delta zero. Any token counts or cost accumulated since the last load() are dropped silently on each save. The message-merge path (prefix-based) is unaffected and works correctly, which is why the new test passes. A separate dead-code method (_read_session_from_file) was added but is never invoked. src/praisonai/praisonai/cli/session/unified.py — specifically the set_baseline_stats() placement in save() and the unused _read_session_from_file method Important Files Changed
Sequence DiagramsequenceDiagram
participant C as Caller
participant S as UnifiedSessionStore
participant D as Disk (JSON file)
Note over C,D: Normal save flow (existing session)
C->>S: load(session_id)
S->>D: read + shared lock
D-->>S: session data
S->>S: from_dict() → set_baseline_stats()
S-->>C: "session (baseline = 5 requests)"
C->>C: "update_stats() → request_count = 6"
C->>S: save(session)
S->>S: set_baseline_stats() resets baseline to 6
S->>D: open + exclusive lock
S->>D: read on-disk state (5 requests)
S->>S: "_merge_sessions: delta = 6-6 = 0"
S->>D: write merged (5 requests — stat lost)
Note over C,D: Telegram user-auth flow (after fix)
participant TG as Telegram Update
participant B as TelegramBot
participant UH as UnknownUserHandler
TG->>B: process_inbound_telegram_message()
B->>B: "is_explicitly_allowed = bool(allowed_users) AND is_user_allowed()"
alt allowed_users is empty
B->>UH: handle(message, bot_context)
UH->>UH: check unknown_user_policy
alt "policy = deny"
UH-->>B: False → return None
else "policy = allow"
UH-->>B: True → continue
end
else user in allowlist
B->>B: "is_explicitly_allowed = True, skip UH"
end
|
| def _read_session_from_file(self, path: Path) -> Optional[UnifiedSession]: | ||
| """Read a session from disk without using the in-process cache.""" | ||
| if not path.exists(): | ||
| return None | ||
|
|
||
| try: | ||
| with open(path, 'rb') as f: | ||
| if sys.platform == "win32": | ||
| import msvcrt | ||
| f.seek(0) | ||
| msvcrt.locking(f.fileno(), msvcrt.LK_RLCK, 1) | ||
| try: | ||
| session = self._parse_session_file(f) | ||
| finally: | ||
| f.seek(0) | ||
| msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1) | ||
| elif _HAS_FCNTL: | ||
| fcntl.flock(f.fileno(), fcntl.LOCK_SH) | ||
| try: | ||
| session = self._parse_session_file(f) | ||
| finally: | ||
| fcntl.flock(f.fileno(), fcntl.LOCK_UN) | ||
| else: | ||
| session = self._parse_session_file(f) | ||
|
|
||
| return session | ||
| except Exception as e: | ||
| logger.error(f"Failed to read session file {path}: {e}") | ||
| return None |
There was a problem hiding this comment.
_read_session_from_file is dead code
This method is added in this PR but is never called anywhere — save() reads under the exclusive lock via _read_json_locked(), and load() also reads under a lock directly. If this was intended as a helper for one of those paths, the call site is missing. It should either be wired up or removed to avoid confusion.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Summary
Critical bug scan found two high-severity issues in recent CLI/bot changes and fixes them with minimal, targeted changes.
1. UnifiedSessionStore concurrent write message loss
Impact: Silent conversation history loss when the same session is used across TUI +
--interactive, or multiple processes sharing~/.praison/sessions/.Root cause:
UnifiedSessionStorecached sessions in memory and performed full snapshot writes without reloading disk state under lock. Commit0aa0d5b5added file locking on write but not read-modify-write merging (unlikeDefaultSessionStore/HierarchicalSessionStore).Fix:
2. Telegram
unknown_user_policybypassImpact: Security/auth bypass — with empty
allowed_users, Telegram allowed all users even whenunknown_user_policywasdeny(the default) orpair. Discord/Slack already used the correct pattern.Root cause:
process_inbound_telegram_message()treated empty allowlist as explicitly allowed viais_user_allowed(), skippingUnknownUserHandler.Fix: Align with Discord/Slack:
is_explicitly_allowed = bool(allowed_users) and is_user_allowed(user_id)Validation
pytest tests/unit/cli/test_unified_session.py— includes newtest_stale_cache_write_preserves_concurrent_updatespytest tests/unit/gateway/test_telegram_security_pipeline.py— updated/added policy tests (33 passed)Summary by CodeRabbit